package drds.plus.executor.cursor.cursor.impl.common;

import drds.plus.executor.ExecuteContext;
import drds.plus.executor.cursor.cursor.ISortingCursor;
import drds.plus.executor.cursor.cursor.IValueFilterCursor;
import drds.plus.executor.cursor.cursor.impl.DuplicateValueRowDataLinkedList;
import drds.plus.executor.cursor.cursor.impl.SortingCursor;
import drds.plus.executor.function.scalar.ScalarFunction;
import drds.plus.executor.record_codec.record.KeyValueRecordPair;
import drds.plus.executor.record_codec.record.Record;
import drds.plus.executor.row_values.RowValues;
import drds.plus.executor.utils.Utils;
import drds.plus.sql_process.abstract_syntax_tree.expression.item.function.Filter;
import drds.plus.sql_process.type.Type;
import drds.plus.util.GeneralUtil;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

/**
 * 用于做没法走索引的条件过滤
 */
public class ValueFilterCursor extends SortingCursor implements IValueFilterCursor {

    protected Filter filter;
    protected ExecuteContext executeContext;

    public ValueFilterCursor(ExecuteContext executeContext, ISortingCursor cursor, Filter filter) throws RuntimeException {

        super(cursor, cursor == null ? null : null, cursor == null ? null : cursor.getOrderByList());
        // 将filter中的函数用规则引擎里的，带实际
        this.filter = filter;
        this.executeContext = executeContext;

    }

    /**
     * 直到next匹配
     */
    public RowValues next() throws RuntimeException {
        RowValues rowData = null;
        while ((rowData = parentCursorNext()) != null) {
            if (match(rowData, filter)) {
                return rowData;
            }
        }
        return null;
    }

    public boolean skipTo(Record keyRecord) throws RuntimeException {
        if (super.skipTo(keyRecord)) {
            RowValues rowData = parentCursorCurrent();
            return rowData != null && match(rowData, filter);
        }
        return false;
    }

    public boolean skipTo(KeyValueRecordPair keyKeyValueRecordPair) throws RuntimeException {
        return skipTo(keyKeyValueRecordPair.getKey());
    }

    /**
     * first为不匹配则直到next匹配
     */
    public RowValues first() throws RuntimeException {
        RowValues rowData = parentCursorFirst();
        if (rowData != null) {
            do {
                if (rowData != null && match(rowData, filter)) {
                    return rowData;
                }
            } while ((rowData = parentCursorNext()) != null);
        }
        return null;
    }

    /**
     * last为不匹配则直到pre匹配
     */
    public RowValues last() throws RuntimeException {
        RowValues rowData = parentCursorLast();
        do {
            if (rowData != null && match(rowData, filter)) {
                return rowData;
            }
        } while ((rowData = parentCursorPrev()) != null);
        return null;
    }

    /**
     * 直到pre匹配
     */
    public RowValues prev() throws RuntimeException {
        RowValues rowData = parentCursorPrev();
        do {
            if (rowData != null && match(rowData, filter)) {
                return rowData;
            }
        } while ((rowData = parentCursorPrev()) != null);
        return null;
    }

    public Map<Record, DuplicateValueRowDataLinkedList> mgetRecordToDuplicateValueRowDataLinkedListMap(List<Record> keyRecordList, boolean prefixMatch, boolean keyFilterOrValueFilter) throws RuntimeException {
        Map<Record, DuplicateValueRowDataLinkedList> map = super.mgetRecordToDuplicateValueRowDataLinkedListMap(keyRecordList, prefixMatch, keyFilterOrValueFilter);
        if (map == null) {
            return null;
        }
        Map<Record, DuplicateValueRowDataLinkedList> cloneableRecordToDuplicateKeyValuePairMap = new HashMap<Record, DuplicateValueRowDataLinkedList>(map.size());
        Iterator<Entry<Record, DuplicateValueRowDataLinkedList>> iterator = map.entrySet().iterator();
        while (iterator.hasNext()) {
            Entry<Record, DuplicateValueRowDataLinkedList> entry = iterator.next();
            DuplicateValueRowDataLinkedList duplicateValueRowDataLinkedList = entry.getValue();
            duplicateValueRowDataLinkedList = allow(duplicateValueRowDataLinkedList, filter);
            if (duplicateValueRowDataLinkedList != null) {
                cloneableRecordToDuplicateKeyValuePairMap.put(entry.getKey(), duplicateValueRowDataLinkedList);
            }
        }
        return cloneableRecordToDuplicateKeyValuePairMap;
    }

    private DuplicateValueRowDataLinkedList allow(DuplicateValueRowDataLinkedList duplicateValueRowDataLinkedList, Filter filter) throws RuntimeException {
        if (filter == null) {
            return duplicateValueRowDataLinkedList;
        }
        // 链表头，第一个allow的DKV放在这里，如果所有都不allow，那么这里为空
        DuplicateValueRowDataLinkedList head = null;
        // 链表尾，用于append新元素
        DuplicateValueRowDataLinkedList tail = null;
        // 遍历用dkv
        DuplicateValueRowDataLinkedList current = duplicateValueRowDataLinkedList;
        if (match(current.rowData, filter)) {
            head = current;
            tail = current;
        }
        while ((current = duplicateValueRowDataLinkedList.next) != null) {
            if (match(current.rowData, filter)) {
                if (head == null) {
                    // 如果这是第一个满足要求的DKV.设置tail和root
                    head = current;
                    tail = current;
                } else {// 前面已经有满足要求的DVK了，设置tail.next并更新tail
                    tail.next = current;
                    tail = current;
                }
            }
        }
        if (tail != null && tail.next != null) {// 最后一个元素，可能有next，但最后一个元素next不满足要求
            tail.next = null;
        }
        return head;
    }

    boolean match(RowValues rowData, Filter filter) throws RuntimeException {
        if (filter == null) {
            return true;
        }

        Object result = ((ScalarFunction) filter.getExtraFunction()).scalarCalucate(this.executeContext, rowData);
        return Type.BooleanType.convert(result);

    }

    public String toStringWithInden(int inden) {
        StringBuilder sb = new StringBuilder();
        String subQueryTab = GeneralUtil.getTab(inden);
        sb.append(subQueryTab).append("【Value Filter Cursor : ").append("\n");
        sb.append(subQueryTab).append(filter).append("\n");
        Utils.printOrderByList(orderByList, inden, sb);
        sb.append(super.toStringWithInden(inden));
        return sb.toString();
    }

    public String toString() {
        return toStringWithInden(0);
    }
}
